home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / tcop / pfrag.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  37.7 KB  |  1,303 lines

  1. /* ----------------------------------------------------------------
  2.  *   FILE
  3.  *    pfrag.c
  4.  *    
  5.  *   DESCRIPTION
  6.  *    POSTGRES process query command code
  7.  *
  8.  *   INTERFACE ROUTINES
  9.  *    ProcessQuery
  10.  *
  11.  *   NOTES
  12.  *
  13.  *   IDENTIFICATION
  14.  *    $Header: /private/postgres/src/tcop/RCS/pfrag.c,v 1.33 1992/07/04 04:38:27 mer Exp $
  15.  * ----------------------------------------------------------------
  16.  */
  17.  
  18. #include <signal.h>
  19. #include <math.h>
  20. #include "tmp/postgres.h"
  21. #include "tcop/tcopdebug.h"
  22. #include "tcop/slaves.h"
  23. #include "nodes/pg_lisp.h"
  24. #include "nodes/plannodes.h"
  25. #include "nodes/plannodes.a.h"
  26. #include "nodes/execnodes.h"
  27. #include "nodes/execnodes.a.h"
  28. #include "executor/execmisc.h"
  29. #include "executor/x_execinit.h"
  30. #include "executor/x_hash.h"
  31. #include "tcop/dest.h"
  32. #include "tmp/portal.h"
  33. #include "commands/command.h"
  34. #include "parser/parsetree.h"
  35. #include "storage/lmgr.h"
  36. #include "catalog/pg_relation.h"
  37. #include "utils/lsyscache.h"
  38. #include "utils/log.h"
  39. #include "utils/palloc.h"
  40. #include "nodes/relation.h"
  41. #include "lib/copyfuncs.h"
  42. #include "lib/catalog.h"
  43. #include "tcop/pquery.h"
  44.  
  45.  RcsId("$Header: /private/postgres/src/tcop/RCS/pfrag.c,v 1.33 1992/07/04 04:38:27 mer Exp $");
  46.  
  47. int AdjustParallelismEnabled = 1;
  48. extern int NStriping;
  49. static MasterSchedulingInfoData MasterSchedulingInfoD = {-1, NULL, -1, NULL};
  50. extern ParallelismModes ParallelismMode;
  51.  
  52. /* ------------------------------------
  53.  *    FingFragments
  54.  *
  55.  *    find all the plan fragments under plan node, mark the fragments starting
  56.  *    with fragmentNo
  57.  *    plan fragments are obtained by decomposing the plan tree across all
  58.  *    blocking edges, i.e., edges out of Hash nodes and Sort nodes
  59.  * ------------------------------------
  60.  */
  61. static List
  62. FindFragments(parsetree, node, fragmentNo)
  63. List parsetree;
  64. Plan node;
  65. int fragmentNo;
  66. {
  67.     List subFragments = LispNil;
  68.     List newFragments;
  69.     Fragment fragment;
  70.  
  71.     set_fragment(node, fragmentNo);
  72.     if (get_lefttree(node) != NULL) 
  73.       if (ExecIsHash(get_lefttree(node))||ExecIsMergeJoin(get_lefttree(node))) {
  74.       /* -----------------------------
  75.        * detected a blocking edge, fragment boundary
  76.        * -----------------------------
  77.        */
  78.           fragment = RMakeFragment();
  79.           set_frag_root(fragment, (Plan)get_lefttree(node));
  80.           set_frag_parent_op(fragment, node);
  81.           set_frag_parallel(fragment, 1);
  82.           set_frag_subtrees(fragment, LispNil);
  83.       set_frag_parsetree(fragment, parsetree);
  84.       set_frag_is_inprocess(fragment, false);
  85.       set_frag_iorate(fragment, 0.0);
  86.           subFragments = nappend1(subFragments, (LispValue)fragment);
  87.         }
  88.        else {
  89.           subFragments = FindFragments(parsetree,get_lefttree(node),fragmentNo);
  90.         }
  91.     if (get_righttree(node) != NULL)
  92.        if (ExecIsHash(get_righttree(node)) || ExecIsSort(get_righttree(node))) {
  93.       /* -----------------------------
  94.        * detected a blocking edge, fragment boundary
  95.        * -----------------------------
  96.        */
  97.           fragment = RMakeFragment();
  98.           set_frag_root(fragment, (Plan)get_righttree(node));
  99.           set_frag_parent_op(fragment, node);
  100.           set_frag_parallel(fragment, 1);
  101.           set_frag_subtrees(fragment, LispNil);
  102.       set_frag_parsetree(fragment, parsetree);
  103.       set_frag_is_inprocess(fragment, false);
  104.       set_frag_iorate(fragment, 0.0);
  105.           subFragments = nappend1(subFragments, (LispValue)fragment);
  106.          }
  107.        else {
  108.          newFragments = FindFragments(parsetree,get_righttree(node),fragmentNo);
  109.          subFragments = nconc(subFragments, newFragments);
  110.         }
  111.     
  112.     return subFragments;
  113. }
  114.  
  115. #define SEQPERTUPTIME    2e-3     /* second */
  116. #define INDPERTUPTIME    0.1
  117. #define DEFPERTUPTIME    4e-3
  118.  
  119. static float
  120. get_pertuptime(plan)
  121. Plan plan;
  122. {
  123.     switch (NodeType(plan)) {
  124.     case classTag(SeqScan):
  125.     return SEQPERTUPTIME;
  126.     case classTag(IndexScan):
  127.     return INDPERTUPTIME;
  128.       }
  129.     return DEFPERTUPTIME;
  130. }
  131.  
  132. #define AVGINDTUPS    5
  133.  
  134. static float
  135. compute_frag_iorate(fragment)
  136. Fragment fragment;
  137. {
  138.     Plan plan;
  139.     int fragmentno;
  140.     float pertupletime;
  141.     Plan p;
  142.     float iorate;
  143.     int tupsize;
  144.  
  145.     plan = get_frag_root(fragment);
  146.     fragmentno = get_fragment(plan);
  147.     pertupletime = 0.0;
  148.     for (;;) {
  149.     pertupletime += get_pertuptime(plan);
  150.     p = get_outerPlan(plan); /* walk down the outer path only for now */
  151.     if (p == NULL || get_fragment(p) != fragmentno)
  152.         break;
  153.     else
  154.         plan = p;
  155.       }
  156.     if (ExecIsSeqScan(plan) || ExecIsScanTemps(plan)) {
  157.     iorate = 1.0/(pertupletime * get_plan_tupperpage(plan));
  158.       }
  159.     else if (ExecIsIndexScan(plan)) {
  160.     iorate = 1.0/(pertupletime * AVGINDTUPS);
  161.       }
  162.     return iorate;
  163. }
  164.  
  165. /* -------------------------------
  166.  *    SetIoRate
  167.  *
  168.  *    compute and set the io rate of each fragment
  169.  * -------------------------------
  170.  */
  171. static void
  172. SetIoRate(fragment)
  173. Fragment fragment;
  174. {
  175.     LispValue x;
  176.     Fragment f;
  177.  
  178.     set_frag_iorate(fragment, compute_frag_iorate(fragment));
  179.     foreach (x, get_frag_subtrees(fragment)) {
  180.     f = (Fragment)CAR(x);
  181.     SetIoRate(f);
  182.       }
  183. }
  184.  
  185. /* --------------------------------
  186.  *    InitialPlanFragments
  187.  *
  188.  *    calls FindFragments() recursively to obtain the initial set of
  189.  *    plan fragments -- the largest possible, further decomposition
  190.  *    might be necessary in DecomposeFragments().
  191.  * --------------------------------
  192.  */
  193. Fragment
  194. InitialPlanFragments(parsetree, plan)
  195. Plan plan;
  196. List parsetree;
  197. {
  198.     Plan node;
  199.     LispValue x, y;
  200.     List fragmentlist;
  201.     List subFragments;
  202.     List newFragmentList;
  203.     int fragmentNo = 0;
  204.     Fragment rootFragment;
  205.     Fragment fragment, frag;
  206.  
  207.     rootFragment = RMakeFragment();
  208.     set_frag_root(rootFragment, plan);
  209.     set_frag_parent_op(rootFragment, NULL);
  210.     set_frag_parallel(rootFragment, 1);
  211.     set_frag_subtrees(rootFragment, LispNil);
  212.     set_frag_parent_frag(rootFragment, NULL);
  213.     set_frag_parsetree(rootFragment, parsetree);
  214.     set_frag_is_inprocess(rootFragment, false);
  215.     set_frag_iorate(rootFragment, 0.0);
  216.  
  217.     fragmentlist = lispCons((LispValue)rootFragment, LispNil);
  218.  
  219.     while (!lispNullp(fragmentlist)) {
  220.     newFragmentList = LispNil;
  221.     foreach (x, fragmentlist) {
  222.        fragment = (Fragment)CAR(x);
  223.        node = get_frag_root(fragment);
  224.        subFragments = FindFragments(parsetree, node, fragmentNo++);
  225.        set_frag_subtrees(fragment, subFragments);
  226.        foreach (y, subFragments) {
  227.           frag = (Fragment)CAR(y);
  228.           set_frag_parent_frag(frag, (FragmentPtr)fragment);
  229.          }
  230.        newFragmentList = nconc(newFragmentList, subFragments);
  231.       }
  232.     fragmentlist = newFragmentList;
  233.       }
  234.     SetIoRate(rootFragment);
  235.     return rootFragment;
  236. }
  237.  
  238. extern int NBuffers;
  239.  
  240. /* -----------------------------
  241.  *    GetCurrentMemSize
  242.  *
  243.  *    get the current amount of available memory
  244.  * -----------------------------
  245.  */
  246. static int
  247. GetCurrentMemSize()
  248. {
  249.    return NBuffers;  /* YYY functionalities to be added later */
  250. }
  251.  
  252. /* -----------------------------
  253.  *    GetCurrentLoadAverage
  254.  *
  255.  *    get the current load average of the system
  256.  * -----------------------------
  257.  */
  258. static float
  259. GetCurrentLoadAverage()
  260. {
  261.     return 0.0;  /* YYY functionalities to be added later */
  262. }
  263.  
  264. /* ------------------------------
  265.  *    GetReadyFragments
  266.  *
  267.  *    get the set of fragments that are ready to fire, i.e., those that
  268.  *    have no children
  269.  * ------------------------------
  270.  */
  271. static List
  272. GetReadyFragments(fragments)
  273. Fragment fragments;
  274. {
  275.     LispValue x;
  276.     Fragment frag;
  277.     List readyFragments = LispNil;
  278.     List readyFrags;
  279.  
  280.     if (lispNullp(get_frag_subtrees(fragments)) && 
  281.     !get_frag_is_inprocess(fragments))
  282.        return lispCons((LispValue)fragments, LispNil);
  283.     foreach (x, get_frag_subtrees(fragments)) {
  284.     frag = (Fragment)CAR(x);
  285.     readyFrags = GetReadyFragments(frag);
  286.     readyFragments = nconc(readyFragments, readyFrags);
  287.       }
  288.     return readyFragments;
  289. }
  290.  
  291. /* ---------------------------------
  292.  *    GetFitFragments
  293.  *
  294.  *    get the set of fragments that can fit in the current available memory
  295.  * ---------------------------------
  296.  */
  297. static List
  298. GetFitFragments(fragmentlist, memsize)
  299. List fragmentlist;
  300. int memsize;
  301. {
  302.     return fragmentlist; /* YYY functionalities to be added later */
  303. }
  304.  
  305. /* ----------------------------------
  306.  *    DecomposeFragments
  307.  *
  308.  *    decompose fragments into smaller fragments to fit in memsize amount
  309.  *    of memory
  310.  * ----------------------------------
  311.  */
  312. static List
  313. DecomposeFragments(fragmentlist, memsize)
  314. List fragmentlist;
  315. int memsize;
  316. {
  317.     return fragmentlist;  /* YYY functionalities to be added later */
  318. }
  319.  
  320. /* -----------------------------------
  321.  *    ChooseToFire
  322.  *
  323.  *    choose among all the ready-to-fire fragments which
  324.  *    to execute in parallel
  325.  * -----------------------------------
  326.  */
  327. static List
  328. ChooseToFire(fragmentlist, memsize)
  329. List fragmentlist;
  330. int memsize;
  331. {
  332.     return lispCons(CAR(fragmentlist), LispNil);   
  333.     /* YYY functionalities to be added later */
  334. }
  335.  
  336. /* ----------------------------------
  337.  *    ChooseFragments
  338.  *
  339.  *    choose the fragments to execute in parallel
  340.  * -----------------------------------
  341.  */
  342. static List
  343. ChooseFragments(fragments, memsize)
  344. Fragment fragments;
  345. int memsize;
  346. {
  347.     List readyFragments;
  348.     List fitFragments;
  349.     List fireFragments;
  350.  
  351.     readyFragments = GetReadyFragments(fragments);
  352.     if (lispNullp(readyFragments)) return LispNil;
  353.     fitFragments = GetFitFragments(readyFragments, memsize);
  354.     if (lispNullp(fitFragments)) {
  355.     fitFragments = DecomposeFragments(fitFragments, memsize);
  356.     if (lispNullp(fitFragments))
  357.        elog(WARN, "memory below hashjoin threshold.");
  358.       }
  359.     fireFragments = ChooseToFire(fitFragments, memsize);
  360.     return fireFragments;
  361. }
  362.  
  363. /* ------------------------------
  364.  *    SetParallelDegree
  365.  *
  366.  *    set the degree of parallelism for fragments in fragmentlist
  367.  * ------------------------------
  368.  */
  369. static void
  370. SetParallelDegree(fragmentlist, nfreeslaves)
  371. List fragmentlist;
  372. int nfreeslaves;
  373. {
  374.     LispValue x;
  375.     Fragment fragment;
  376.     Plan plan;
  377.     int fragmentno;
  378.  
  379.     /* YYY more functionalities to be added later */
  380.     foreach (x,fragmentlist) {
  381.        fragment = (Fragment)CAR(x);
  382.        plan = get_frag_root(fragment);
  383.        fragmentno = get_fragment(plan);
  384.        if (fragmentno < 0) {
  385.            set_frag_parallel(fragment, 1);  /* YYY */
  386.       }
  387.        else {
  388.            set_frag_parallel(fragment, nfreeslaves);  /* YYY */
  389.      }
  390.      }
  391. }
  392.  
  393. /* ---------------------------------
  394.  *    plan_is_parallelizable
  395.  *
  396.  *    returns true if plan is parallelizable, false otherwise
  397.  * ---------------------------------
  398.  */
  399. static bool
  400. plan_is_parallelizable(plan)
  401. Plan plan;
  402. {
  403.     if (plan == NULL)
  404.     return true;
  405.     if (ExecIsMergeJoin(plan))
  406.     return false;
  407.     if (ExecIsSort(plan))
  408.     return false;
  409.     if (ExecIsIndexScan(plan)) {
  410.     List indxqual;
  411.     LispValue x;
  412.     NameData opname;
  413.     Oper op;
  414.     indxqual = get_indxqual((IndexScan)plan);
  415.     if (length(CAR(indxqual)) < 2)
  416.         return false;
  417.     foreach (x, CAR(indxqual)) {
  418.         op = (Oper)CAR(CAR(x));
  419.         opname = get_opname(get_opno(op));
  420.         if (strcmp(&opname, "=") == 0)
  421.         return false;
  422.      }
  423.       }
  424.     if (plan_is_parallelizable(get_outerPlan(plan)))
  425.     return true;
  426.     return false;
  427. }
  428.  
  429. /* ----------------------------------------
  430.  *    nappend1iobound
  431.  *
  432.  *    insert an io-bound fragment into a list in 
  433.  *    descending io rate order.
  434.  * ----------------------------------------
  435.  */
  436. static List
  437. nappend1iobound(ioboundlist, frag)
  438. List ioboundlist;
  439. Fragment frag;
  440. {
  441.     LispValue x;
  442.     Fragment f;
  443.  
  444.     if (lispNullp(ioboundlist))
  445.     return lispCons((LispValue)frag, LispNil);
  446.     f = (Fragment)CAR(ioboundlist);
  447.     if (get_frag_iorate(frag) > get_frag_iorate(f)) {
  448.     return(nconc(lispCons((LispValue)frag, LispNil), ioboundlist));
  449.       }
  450.     else {
  451.     return(nconc(lispCons((LispValue)f, LispNil), 
  452.              nappend1iobound(CDR(ioboundlist), frag)));
  453.       }
  454. }
  455.  
  456. /* ----------------------------------------
  457.  *    nappend1cpubound
  458.  *
  459.  *    insert a cpu-bound fragment into a list in 
  460.  *    ascending io rate order.
  461.  * ----------------------------------------
  462.  */
  463. static List
  464. nappend1cpubound(cpuboundlist, frag)
  465. List cpuboundlist;
  466. Fragment frag;
  467. {
  468.     LispValue x;
  469.     Fragment f;
  470.  
  471.     if (lispNullp(cpuboundlist))
  472.     return lispCons((LispValue)frag, LispNil);
  473.     f = (Fragment)CAR(cpuboundlist);
  474.     if (get_frag_iorate(frag) < get_frag_iorate(f)) {
  475.     return(nconc(lispCons((LispValue)frag, LispNil), cpuboundlist));
  476.       }
  477.     else {
  478.     return(nconc(lispCons((LispValue)f, LispNil), 
  479.              nappend1cpubound(CDR(cpuboundlist), frag)));
  480.       }
  481. }
  482.  
  483. #define DISKBANDWIDTH    60     /* IO per second */
  484.  
  485. /* -------------------------------------
  486.  *    ClassifyFragments
  487.  *
  488.  *    classify fragments into io-bound, cpu-bound, unparallelizable or
  489.  *    parallelism-preset.
  490.  * -------------------------------------
  491.  */
  492. static void
  493. ClassifyFragments(fraglist, ioboundlist, cpuboundlist, unparallelizablelist, presetlist)
  494. List fraglist;
  495. List *ioboundlist, *cpuboundlist, *unparallelizablelist, *presetlist;
  496. {
  497.     LispValue x;
  498.     Fragment f;
  499.     Plan p;
  500.     float iorate;
  501.     float diagonal;
  502.  
  503.     *ioboundlist = LispNil;
  504.     *cpuboundlist = LispNil;
  505.     *unparallelizablelist = LispNil;
  506.     *presetlist = LispNil;
  507.     diagonal = (float)NStriping * DISKBANDWIDTH/(float)GetNumberSlaveBackends();
  508.     foreach (x, fraglist) {
  509.     f = (Fragment)CAR(x);
  510.     p = get_frag_root(f);
  511.     if (!plan_is_parallelizable(p)) {
  512.         *unparallelizablelist = nappend1(*unparallelizablelist, 
  513.                          (LispValue)f);
  514.       }
  515.     else if (!lispNullp(parse_parallel(get_frag_parsetree(f)))) {
  516.         *presetlist = nappend1(*presetlist, (LispValue)f);
  517.       }
  518.     else {
  519.         iorate = get_frag_iorate(f);
  520.         if (iorate > diagonal) {
  521.             *ioboundlist = nappend1iobound(*ioboundlist, f);
  522.           }
  523.         else {
  524.         *cpuboundlist = nappend1cpubound(*cpuboundlist, f);
  525.           }
  526.       }
  527.       }
  528. }
  529.  
  530. /* ---------------------------------
  531.  *    ComputeIoCpuBalancePoint
  532.  *
  533.  *    compute io/cpu balance point of two fragments:
  534.  *    f1, io-bound
  535.  *    f2, cpu-bound
  536.  * ---------------------------------
  537.  */
  538. static void
  539. ComputeIoCpuBalancePoint(f1, f2, x1, x2)
  540. Fragment f1, f2;
  541. int *x1, *x2;
  542. {
  543.     float bandwidth;
  544.     float iorate1, iorate2;
  545.     int nfreeslaves;
  546.  
  547.     nfreeslaves = GetNumberSlaveBackends();
  548.     bandwidth = NStriping * DISKBANDWIDTH;
  549.     iorate1 = get_frag_iorate(f1);
  550.     iorate2 = get_frag_iorate(f2);
  551.     *x1=(int)floor((double)((bandwidth-iorate2*nfreeslaves)/(iorate1-iorate2)));
  552.     *x2=(int)ceil((double)((iorate1*nfreeslaves-bandwidth)/(iorate1-iorate2)));
  553. }
  554.  
  555. /* --------------------------------------
  556.  *    MaxFragParallelism
  557.  *
  558.  *    return the maximum parallelism for a fragment
  559.  *    within the limit of number of free processors and disk bandwidth
  560.  * -------------------------------------
  561.  */
  562. static int
  563. MaxFragParallelism(frag)
  564. Fragment frag;
  565. {
  566.     float ioRate;
  567.     int par;
  568.  
  569.     if (!plan_is_parallelizable(get_frag_root(frag)))
  570.     return 1;
  571.     if (!lispNullp(parse_parallel(get_frag_parsetree(frag))))
  572.     return CInteger(parse_parallel(get_frag_parsetree(frag)));
  573.     ioRate = (float)get_frag_iorate(frag);
  574.     par = MIN((int)floor((double)NStriping*DISKBANDWIDTH/ioRate),
  575.           NumberOfFreeSlaves);
  576.     return par;
  577. }
  578.  
  579. /* --------------------------------------
  580.  *    CurMaxFragParallelism
  581.  *
  582.  *    return the maximum parallelism for a fragment
  583.  *    within the limit of current number of free processors and 
  584.  *    available disk bandwidth
  585.  * -------------------------------------
  586.  */
  587. static int
  588. CurMaxFragParallelism(frag, curbandwidth, nfreeslaves)
  589. Fragment frag;
  590. float curbandwidth;
  591. int nfreeslaves;
  592. {
  593.     float ioRate;
  594.     int par;
  595.  
  596.     if (!plan_is_parallelizable(get_frag_root(frag)))
  597.     return 1;
  598.     if (!lispNullp(parse_parallel(get_frag_parsetree(frag))))
  599.     return CInteger(parse_parallel(get_frag_parsetree(frag)));
  600.     ioRate = (float)get_frag_iorate(frag);
  601.     par = MIN((int)floor((double)curbandwidth/ioRate),
  602.           nfreeslaves);
  603.     return par;
  604. }
  605.  
  606. /* ------------------------
  607.  *    AdjustParallelism
  608.  *
  609.  *    dynamically adjust degrees of parallelism of the fragments that
  610.  *    are already in process to take advantage of the extra processors
  611.  * ------------------------
  612.  */
  613. static void
  614. AdjustParallelism(pardelta, groupid)
  615. int pardelta;
  616. int groupid;
  617. {
  618.     int j;
  619.     int slave;
  620.     int max_curpage;
  621.     int size;
  622.     int oldnproc;
  623.  
  624.     Assert(pardelta != 0);
  625.     SLAVE_elog(DEBUG, "master trying to adjust degrees of parallelism");
  626.     SLAVE1_elog(DEBUG, "master sending signal to process group %d", groupid);
  627.     signalProcGroup(groupid, SIGPARADJ);
  628.     max_curpage = getProcGroupMaxPage(groupid);
  629.     SLAVE1_elog(DEBUG, "master gets maxpage = %d", max_curpage);
  630.     oldnproc = ProcGroupInfoP[groupid].nprocess;
  631.     if (max_curpage == NOPARADJ) {
  632.     /* --------------------------
  633.      *  forget about adjustment to parallelism
  634.      *  in this case -- the fragment is almost finished
  635.      * ---------------------------
  636.      */
  637.     SLAVE_elog(DEBUG, "master changes mind on adjusting parallelism");
  638.     ProcGroupInfoP[groupid].paradjpage = NOPARADJ;
  639.         OneSignalM(&(ProcGroupInfoP[groupid].m1lock), oldnproc);
  640.     return;
  641.       }
  642.     ProcGroupInfoP[groupid].paradjpage = max_curpage + 1; 
  643.                    /* page on which to adjust par. */
  644.     if (pardelta > 0) {
  645.         ProcGroupInfoP[groupid].nprocess += pardelta;
  646.         ProcGroupInfoP[groupid].scounter.count = 
  647.                           ProcGroupInfoP[groupid].nprocess;
  648.         ProcGroupInfoP[groupid].newparallel = ProcGroupInfoP[groupid].nprocess;
  649.         SLAVE2_elog(DEBUG,
  650.             "master signals waiting slaves with adjpage=%d,newpar=%d",
  651.                 ProcGroupInfoP[groupid].paradjpage,
  652.             ProcGroupInfoP[groupid].newparallel);
  653.         OneSignalM(&(ProcGroupInfoP[groupid].m1lock), oldnproc);
  654.         set_frag_parallel(ProcGroupLocalInfoP[groupid].fragment,
  655.               get_frag_parallel(ProcGroupLocalInfoP[groupid].fragment)+
  656.               pardelta);
  657.         ProcGroupSMBeginAlloc(groupid);
  658.         size = sizeofTmpRelDesc(QdGetPlan(ProcGroupInfoP[groupid].queryDesc));
  659.         for (j=0; j<pardelta; j++) {
  660.         if (NumberOfFreeSlaves == 0) {
  661.         elog(WARN, 
  662.              "trying to adjust to too much parallelism: out of slaves");
  663.           }
  664.         slave = getFreeSlave();
  665.         SLAVE2_elog(DEBUG, "master adding slave %d to procgroup %d", 
  666.             slave, groupid);
  667.             SlaveInfoP[slave].resultTmpRelDesc = 
  668.                     (Relation)ProcGroupSMAlloc(size);
  669.             addSlaveToProcGroup(slave, groupid, oldnproc+j);
  670.         V_Start(slave);
  671.           }
  672.         ProcGroupSMEndAlloc();
  673.       }
  674.     else {
  675.         ProcGroupInfoP[groupid].newparallel = 
  676.                  ProcGroupInfoP[groupid].nprocess + pardelta;
  677.         SLAVE2_elog(DEBUG,
  678.             "master signals waiting slaves with adjpage=%d,newpar=%d",
  679.                 ProcGroupInfoP[groupid].paradjpage,
  680.             ProcGroupInfoP[groupid].newparallel);
  681.     ProcGroupInfoP[groupid].dropoutcounter.count = -pardelta;
  682.         OneSignalM(&(ProcGroupInfoP[groupid].m1lock), oldnproc);
  683.       }
  684. }
  685.  
  686. /* ----------------------------------------------------------------
  687.  *    ParallelOptimize
  688.  *    
  689.  *    analyzes plan fragments and determines what fragments to execute
  690.  *    and with how much parallelism
  691.  *    
  692.  * ----------------------------------------------------------------
  693.  */
  694. static List
  695. ParallelOptimize(fragmentlist)
  696. List fragmentlist;
  697. {
  698.     LispValue y;
  699.     Fragment fragment;
  700.     int memAvail;
  701.     float loadAvg;
  702.     List readyFragmentList;
  703.     List flist;
  704.     List ioBoundFragList, cpuBoundFragList, unparallelizableFragList;
  705.     List presetFraglist;
  706.     List newIoBoundFragList, newCpuBoundFragList;
  707.     Fragment f1, f2, f;
  708.     int x1, x2;
  709.     List fireFragmentList;
  710.     int nfreeslaves;
  711.     LispValue k, x;
  712.     int parallel;
  713.     Plan plan;
  714.     bool io_running, cpu_running;
  715.     int curpar;
  716.     int pardelta;
  717.  
  718.     fireFragmentList = LispNil;
  719.     nfreeslaves = NumberOfFreeSlaves;
  720.  
  721.     /* ------------------------------
  722.      *  find those plan fragments that are ready to run, i.e.,
  723.      *  those with all input data ready.
  724.      * ------------------------------
  725.      */
  726.     readyFragmentList = LispNil;
  727.     foreach (y, fragmentlist) {
  728.     fragment = (Fragment)CAR(y);
  729.     flist = GetReadyFragments(fragment);
  730.     readyFragmentList = nconc(readyFragmentList, flist);
  731.       }
  732.     if (ParallelismMode == INTRA_ONLY) {
  733.     f = (Fragment)CAR(readyFragmentList);
  734.     fireFragmentList = lispCons((LispValue)f, LispNil);
  735.     SetParallelDegree(fireFragmentList, MaxFragParallelism(f));
  736.     return fireFragmentList;
  737.       }
  738.     /* -------------------------------
  739.      *  classify the ready fragments into io-bound, cpu-bound and
  740.      *  unparallelizable
  741.      * -------------------------------
  742.      */
  743.     ClassifyFragments(readyFragmentList, &ioBoundFragList, &cpuBoundFragList, 
  744.               &unparallelizableFragList, &presetFraglist);
  745.     fireFragmentList = LispNil;
  746.     /* -------------------------------
  747.      *  take care of the unparallelizable fragments first
  748.      * -------------------------------
  749.      */
  750.     if (!lispNullp(unparallelizableFragList)) {
  751.     fireFragmentList = lispCons(CAR(unparallelizableFragList), LispNil);
  752.         SetParallelDegree(fireFragmentList, 1);
  753.     elog(NOTICE, "nonparallelizable fragment, running sequentially\n");
  754.     return fireFragmentList;
  755.       }
  756.     /* --------------------------------
  757.      * deal with those fragments with parallelism preset
  758.      * --------------------------------
  759.      */
  760.     if (!lispNullp(presetFraglist)) {
  761.     foreach (x, presetFraglist) {
  762.         f = (Fragment)CAR(x);
  763.         k = parse_parallel(get_frag_parsetree(f));
  764.         parallel = CInteger(k);
  765.         SetParallelDegree((y=lispCons((LispValue)f, LispNil)), parallel);
  766.         fireFragmentList = nconc(fireFragmentList, y);
  767.       }
  768.     SLAVE_elog(DEBUG, "executing fragments with preset parallelism.");
  769.     return fireFragmentList;
  770.       }
  771.     /* ---------------------------------
  772.      *  now we deal with the parallelizable plan fragments
  773.      * ---------------------------------
  774.      */
  775.     if (MasterSchedulingInfoD.ioBoundFrag != NULL) {
  776.     f1 = MasterSchedulingInfoD.ioBoundFrag;
  777.     io_running = true;
  778.       }
  779.     else {
  780.     io_running = false;
  781.     if (lispNullp(ioBoundFragList))
  782.         f1 = NULL;
  783.     else
  784.         f1 = (Fragment)CAR(ioBoundFragList);
  785.       }
  786.     if (MasterSchedulingInfoD.cpuBoundFrag != NULL) {
  787.     f2 = MasterSchedulingInfoD.cpuBoundFrag;
  788.     cpu_running = true;
  789.       }
  790.     else {
  791.     cpu_running = false;
  792.     if (lispNullp(cpuBoundFragList))
  793.         f2 = NULL;
  794.     else
  795.         f2 = (Fragment)CAR(cpuBoundFragList);
  796.       }
  797.     if (f1 != NULL && f2 != NULL) {
  798.     if (ParallelismMode != INTER_WO_ADJ || (!io_running && !cpu_running)) {
  799.         ComputeIoCpuBalancePoint(f1, f2, &x1, &x2);
  800.         SLAVE2_elog(DEBUG, "executing two fragments at balance point (%d, %d).",
  801.             x1, x2);
  802.       }
  803.     if (io_running) {
  804.         curpar = get_frag_parallel(f1);
  805.         if (ParallelismMode == INTER_WO_ADJ) {
  806.         int newpar;
  807.         float curband;
  808.         curband = NStriping*DISKBANDWIDTH - curpar*get_frag_iorate(f1);
  809.         newpar = CurMaxFragParallelism(f2, curband, NumberOfFreeSlaves);
  810.         if (newpar == 0) return LispNil;
  811.         fireFragmentList = lispCons((LispValue)f2, LispNil);
  812.         SetParallelDegree(fireFragmentList, newpar);
  813.           }
  814.         else {
  815.         pardelta = x1 - curpar;
  816.         if (pardelta != 0) {
  817.            SLAVE_elog(DEBUG, "adjusting parallelism of io-bound task.");
  818.            AdjustParallelism(pardelta,
  819.                      MasterSchedulingInfoD.ioBoundGroupId);
  820.           }
  821.         fireFragmentList = lispCons((LispValue)f2, LispNil);
  822.         if (pardelta >= 0) {
  823.             SetParallelDegree(fireFragmentList, x2);
  824.           }
  825.         else {
  826.             SetParallelDegree(fireFragmentList, NumberOfFreeSlaves);
  827.           }
  828.           }
  829.         MasterSchedulingInfoD.cpuBoundFrag = f2;
  830.       }
  831.     else if (cpu_running) {
  832.         curpar = get_frag_parallel(f2);
  833.         if (ParallelismMode == INTER_WO_ADJ) {
  834.         int newpar;
  835.         float curband;
  836.         curband = NStriping*DISKBANDWIDTH - curpar*get_frag_iorate(f2);
  837.         newpar = CurMaxFragParallelism(f1, curband, NumberOfFreeSlaves);
  838.         if (newpar == 0) return LispNil;
  839.         fireFragmentList = lispCons((LispValue)f1, LispNil);
  840.         SetParallelDegree(fireFragmentList, newpar);
  841.           }
  842.         else {
  843.         pardelta = x2 - curpar;
  844.         if (pardelta != 0) {
  845.           SLAVE_elog(DEBUG, "adjusting parallelism of cpu-bound task.");
  846.           AdjustParallelism(pardelta,
  847.                     MasterSchedulingInfoD.cpuBoundGroupId);
  848.           }
  849.         fireFragmentList = lispCons((LispValue)f1, LispNil);
  850.         if (pardelta >= 0) {
  851.             SetParallelDegree(fireFragmentList, x1);
  852.           }
  853.         else {
  854.             SetParallelDegree(fireFragmentList, NumberOfFreeSlaves);
  855.           }
  856.           }
  857.         MasterSchedulingInfoD.ioBoundFrag = f1;
  858.       }
  859.     else {
  860.         SetParallelDegree((y=lispCons((LispValue)f1, LispNil)), x1);
  861.         fireFragmentList = nconc(fireFragmentList, y);
  862.         SetParallelDegree((y=lispCons((LispValue)f2, LispNil)), x2);
  863.         fireFragmentList = nconc(fireFragmentList, y);
  864.         MasterSchedulingInfoD.ioBoundFrag = f1;
  865.         MasterSchedulingInfoD.cpuBoundFrag = f2;
  866.       }
  867.       }
  868.     else if (f1 == NULL && f2 != NULL) {
  869.     /* -----------------------------
  870.      *  out of io-bound fragments, use intra-operation parallelism only
  871.      *  for cpu-bound fragments.
  872.      * ------------------------------
  873.      */
  874.     fireFragmentList = lispCons((LispValue)f2, LispNil);
  875.     SetParallelDegree(fireFragmentList, nfreeslaves);
  876.     MasterSchedulingInfoD.cpuBoundFrag = f2;
  877.     SLAVE1_elog(DEBUG, 
  878.             "out of io-bound tasks, running cpu-bound task with parallelism %d",
  879.             nfreeslaves);
  880.       }
  881.     else if (f1 != NULL && f2 == NULL) {
  882.     nfreeslaves = MaxFragParallelism(f1);
  883.     fireFragmentList = lispCons((LispValue)f1, LispNil);
  884.     SetParallelDegree(fireFragmentList, nfreeslaves);
  885.     fireFragmentList = nconc(fireFragmentList, y);
  886.     MasterSchedulingInfoD.ioBoundFrag = f1;
  887.     SLAVE1_elog(DEBUG, 
  888.            "out of cpu-bound tasks, running io-bound task with parallelism %d",
  889.             nfreeslaves);
  890.        }
  891.      return fireFragmentList;
  892. }
  893.  
  894. #define MINHASHTABLEMEMORYKEY    1000
  895. static IpcMemoryKey nextHashTableMemoryKey = 0;
  896.  
  897. /* -------------------------
  898.  *    getNextHashTableMemoryKey
  899.  *
  900.  *    get the next hash table key
  901.  * -------------------------
  902.  */
  903. static IpcMemoryKey
  904. getNextHashTableMemoryKey()
  905. {
  906.     extern int MasterPid;
  907.     return (nextHashTableMemoryKey++ + MINHASHTABLEMEMORYKEY + MasterPid);
  908. }
  909.  
  910. /* ------------------------
  911.  *    sizeofTmpRelDesc
  912.  *
  913.  *    calculate the size of reldesc of the temporary relation of plan
  914.  * ------------------------
  915.  */
  916. static int
  917. sizeofTmpRelDesc(plan)
  918. Plan plan;
  919. {
  920.     List targetList;
  921.     int natts;
  922.     int size;
  923.  
  924.     targetList = get_qptargetlist(plan);
  925.     natts = ExecTargetListLength(targetList);
  926.     /* ------------------
  927.      * see CopyRelDescUsing() in lib/C/copyfuncs.c if you want to know
  928.      * how size if derived.
  929.      * ------------------
  930.      */
  931.     size = sizeof(RelationData) + (natts - 1) * sizeof(TupleDescriptorData) +
  932.        sizeof(RuleLock) + sizeof(RelationTupleFormD) +
  933.        sizeof(LockInfoData) + 
  934.        natts * (sizeof(AttributeTupleFormData) + sizeof(RuleLock)) +
  935.        48; /* some extra for possible LONGALIGN() */
  936.     return size;
  937. }
  938.  
  939. /* ----------------------------------------------------------------
  940.  *    OptimizeAndExecuteFragments
  941.  *
  942.  *    Optimize plan fragments to explore both intra-fragment
  943.  *    and inter-fragment parallelism and execute the "optimal"
  944.  *    parallel plan
  945.  *
  946.  * ----------------------------------------------------------------
  947.  */
  948. void
  949. OptimizeAndExecuteFragments(fragmentlist, destination)
  950. List         fragmentlist;
  951. CommandDest    destination;
  952. {
  953.     LispValue        x;
  954.     int            i;
  955.     List        currentFragmentList;
  956.     Fragment        fragment;
  957.     int            nparallel;
  958.     List        finalResultRelation;
  959.     Plan        plan;
  960.     List        parsetree;
  961.     Plan        parentPlan;
  962.     Fragment        parentFragment;
  963.     int            groupid;
  964.     ProcessNode        *p;
  965.     List        subtrees;
  966.     List        fragQueryDesc;
  967.     HashJoinTable    hashtable;
  968.     int            hashTableMemorySize;
  969.     IpcMemoryKey    hashTableMemoryKey;
  970.     IpcMemoryKey    getNextHashTableMemoryKey();
  971.     ScanTemps        scantempNode;
  972.     Relation        tempRelationDesc;
  973.     List        tempRelationDescList;
  974.     Relation        shmTempRelationDesc;
  975.     List        fraglist;
  976.     CommandDest        dest;
  977.     int            size;
  978.     
  979.     fraglist = fragmentlist;
  980.     while (!lispNullp(fraglist)) {
  981.     /* ------------
  982.      * choose the set of fragments to execute and parallelism
  983.      * for each fragment.
  984.      * ------------
  985.      */
  986.         currentFragmentList = ParallelOptimize(fraglist);
  987.     foreach (x, currentFragmentList) {
  988.        fragment = (Fragment)CAR(x);
  989.        nparallel = get_frag_parallel(fragment);
  990.        plan = get_frag_root(fragment);
  991.        parsetree = get_frag_parsetree(fragment);
  992.        parentFragment = (Fragment)get_frag_parent_frag(fragment);
  993.        finalResultRelation = parse_tree_result_relation(parsetree);
  994.        dest = destination;
  995.        dest = None; /* WWW */
  996.        if (ExecIsHash(plan))  {
  997.           /* ------------
  998.            *  if it is hashjoin, create the hash table
  999.            *  so that the slaves can share it
  1000.            * ------------
  1001.            */
  1002.           hashTableMemoryKey = getNextHashTableMemoryKey();
  1003.           set_hashtablekey((Hash)plan, hashTableMemoryKey);
  1004.           hashtable = ExecHashTableCreate(plan);
  1005.           set_hashtable((Hash)plan, hashtable);
  1006.           hashTableMemorySize = get_hashtablesize((Hash)plan);
  1007.           parse_tree_result_relation(parsetree) = LispNil;
  1008.           }
  1009.        else if (get_fragment(plan) >= 0) {
  1010.           /* ------------
  1011.            *  this means that this an intermediate fragment, so
  1012.            *  the result should be kept in some temporary relation
  1013.            * ------------
  1014.            */
  1015.           /* WWW
  1016.           parse_tree_result_relation(parsetree) =
  1017.           lispCons(lispAtom("intotemp"), LispNil);
  1018.            */
  1019.           dest = None;
  1020.          }
  1021.        /* ---------------
  1022.         * create query descriptor for the fragment
  1023.         * ---------------
  1024.         */
  1025.        fragQueryDesc = CreateQueryDesc(parsetree, plan,
  1026.                        (char *) NULL,
  1027.                        (ObjectId *) NULL,
  1028.                        0, dest);
  1029.  
  1030.        /* ---------------
  1031.         * assign a process group to work on the fragment
  1032.         * ---------------
  1033.         */
  1034.        groupid = getFreeProcGroup(nparallel);
  1035.        if (fragment == MasterSchedulingInfoD.ioBoundFrag)
  1036.            MasterSchedulingInfoD.ioBoundGroupId = groupid;
  1037.        else if (fragment == MasterSchedulingInfoD.cpuBoundFrag)
  1038.            MasterSchedulingInfoD.cpuBoundGroupId = groupid;
  1039.        ProcGroupLocalInfoP[groupid].fragment = fragment;
  1040.        ProcGroupInfoP[groupid].status = WORKING;
  1041.        ProcGroupSMBeginAlloc(groupid);
  1042.        ProcGroupInfoP[groupid].queryDesc = (List)
  1043.             CopyObjectUsing((Node)fragQueryDesc, ProcGroupSMAlloc);
  1044.        size = sizeofTmpRelDesc(plan);
  1045.        for (p = ProcGroupLocalInfoP[groupid].memberProc;
  1046.         p != NULL;
  1047.         p = p->next) {
  1048.            SlaveInfoP[p->pid].resultTmpRelDesc = 
  1049.          (Relation)ProcGroupSMAlloc(size);
  1050.           }
  1051.        ProcGroupSMEndAlloc();
  1052.        ProcGroupInfoP[groupid].scounter.count = nparallel;
  1053.        ProcGroupInfoP[groupid].nprocess = nparallel;
  1054. #ifdef        TCOP_SLAVESYNCDEBUG
  1055.        {
  1056.            char procs[100];
  1057.            p = ProcGroupLocalInfoP[groupid].memberProc;
  1058.            sprintf(procs, "%d", p->pid);
  1059.            for (p = p->next; p != NULL; p = p->next)
  1060.            sprintf(procs+strlen(procs), ",%d", p->pid);
  1061.            SLAVE2_elog(DEBUG, "master to wake up procgroup %d {%s} for",
  1062.                groupid, procs);
  1063.            set_query_range_table(parsetree);
  1064.            pplan(plan);
  1065.            fprintf(stderr, "\n");
  1066.         }
  1067. #endif
  1068.        wakeupProcGroup(groupid);
  1069.        set_frag_is_inprocess(fragment, true);
  1070.        /* ---------------
  1071.         * restore the original result relation descriptor
  1072.         * ---------------
  1073.         */
  1074.        parse_tree_result_relation(parsetree) = finalResultRelation;
  1075.      }
  1076.  
  1077.        /* ------------
  1078.     * if there are extra processors lying around,
  1079.     * dynamically adjust degrees of parallelism of
  1080.     * fragments that are already in process.
  1081.        if (NumberOfFreeSlaves > 0 && AdjustParallelismEnabled) {
  1082.         AdjustParallelism(NumberOfFreeSlaves, -1);
  1083.      }
  1084.     * ------------
  1085.     */
  1086.  
  1087.        /* ----------------
  1088.     * wait for some process group to complete execution
  1089.     * ----------------
  1090.     */
  1091. MasterWait:
  1092.        P_Finished();
  1093.  
  1094.        /* --------------
  1095.     * some process group has finished processing a fragment,
  1096.     * find that group
  1097.     * --------------
  1098.     */
  1099.        groupid = getFinishedProcGroup();
  1100.        if (ProcGroupInfoP[groupid].status == PARADJPENDING) {
  1101.        /* -----------------------------
  1102.         * master decided earlier than process group, groupid's parallelism
  1103.         * was going to be reduced.  now the adjustment point is reached.
  1104.         * master is ready to collect those slaves spared from the
  1105.         * group.
  1106.         * -----------------------------
  1107.         */
  1108.        ProcessNode *p, *prev, *nextp;
  1109.        int newparallel = ProcGroupInfoP[groupid].newparallel;
  1110.        List tmpreldesclist = LispNil;
  1111.        int nfreeslave;
  1112.  
  1113.          SLAVE1_elog(DEBUG, "master woken up by paradjpending process group %d",
  1114.                groupid);
  1115.        ProcGroupInfoP[groupid].status = WORKING;
  1116.        tempRelationDescList = 
  1117.                  ProcGroupLocalInfoP[groupid].resultTmpRelDescList;
  1118.        prev = NULL;
  1119.        for (p = ProcGroupLocalInfoP[groupid].memberProc;
  1120.         p != NULL; p = nextp) {
  1121.            nextp = p->next;
  1122.            if (SlaveInfoP[p->pid].groupPid >= newparallel) {
  1123.            /* ------------------------
  1124.             * before freeing this slave, we have to save its
  1125.             * resultTmpRelDesc somewhere.
  1126.             * -------------------------
  1127.             */
  1128. #ifndef PALLOC_DEBUG
  1129.             tempRelationDesc = CopyRelDescUsing(
  1130.                         SlaveInfoP[p->pid].resultTmpRelDesc,
  1131.                         palloc);
  1132. #else
  1133.             tempRelationDesc = CopyRelDescUsing(
  1134.                         SlaveInfoP[p->pid].resultTmpRelDesc,
  1135.                         palloc_debug);
  1136. #endif
  1137.             tempRelationDescList = nappend1(tempRelationDescList,
  1138.                             (LispValue)tempRelationDesc);
  1139.             if (prev == NULL) {
  1140.             ProcGroupLocalInfoP[groupid].memberProc = nextp;
  1141.               }
  1142.             else {
  1143.             prev->next = nextp;
  1144.               }
  1145.             freeSlave(p->pid);
  1146.           }
  1147.          }
  1148.             ProcGroupLocalInfoP[groupid].resultTmpRelDescList =
  1149.                             tempRelationDescList;
  1150.         /* --------------------------
  1151.          * now we have to adjust nprocess and countdown of group groupid
  1152.          * --------------------------
  1153.          */
  1154.         nfreeslave = ProcGroupInfoP[groupid].nprocess - 
  1155.                  ProcGroupInfoP[groupid].newparallel;
  1156.         ProcGroupInfoP[groupid].nprocess = 
  1157.                 ProcGroupInfoP[groupid].newparallel;
  1158.         ProcGroupInfoP[groupid].countdown -= nfreeslave;
  1159.         /* ---------------------------
  1160.          * adjust parallelism with the freed slaves
  1161.          * ---------------------------
  1162.          */
  1163.         if (MasterSchedulingInfoD.ioBoundGroupId == groupid)
  1164.             AdjustParallelism(nfreeslave, 
  1165.                       MasterSchedulingInfoD.cpuBoundGroupId);
  1166.         else
  1167.             AdjustParallelism(nfreeslave,
  1168.                       MasterSchedulingInfoD.ioBoundGroupId);
  1169.         if (ProcGroupInfoP[groupid].countdown == 0) {
  1170.             /* -----------------------
  1171.              * this means that this group has actually finished
  1172.              * go down to process the group
  1173.              * -----------------------
  1174.              */
  1175.            }
  1176.         else {
  1177.             /* ------------------------
  1178.              * otherwise, go to P_Finished()
  1179.              * ------------------------
  1180.              */
  1181.             goto MasterWait;
  1182.           }
  1183.       }
  1184.        SLAVE1_elog(DEBUG, "master woken up by finished process group %d", 
  1185.            groupid);
  1186.        fragment = ProcGroupLocalInfoP[groupid].fragment;
  1187.        if (fragment == MasterSchedulingInfoD.ioBoundFrag) {
  1188.        MasterSchedulingInfoD.ioBoundFrag = NULL;
  1189.        MasterSchedulingInfoD.ioBoundGroupId = -1;
  1190.      }
  1191.        else if (fragment == MasterSchedulingInfoD.cpuBoundFrag) {
  1192.        MasterSchedulingInfoD.cpuBoundFrag = NULL;
  1193.        MasterSchedulingInfoD.cpuBoundGroupId = -1;
  1194.      }
  1195.        nparallel = get_frag_parallel(fragment);
  1196.        plan = get_frag_root(fragment);
  1197.        parentPlan = get_frag_parent_op(fragment);
  1198.        parentFragment = (Fragment)get_frag_parent_frag(fragment);
  1199.        /* ---------------
  1200.     * delete the finished fragment from the subtree list of its
  1201.     * parent fragment
  1202.     * ---------------
  1203.     */
  1204.        if (parentFragment == NULL)
  1205.       subtrees = LispNil;
  1206.        else {
  1207.       subtrees = get_frag_subtrees(parentFragment);
  1208.       set_frag_subtrees(parentFragment,
  1209.                 nLispRemove(subtrees, (LispValue)fragment));
  1210.      }
  1211.        /* ----------------
  1212.     * let the parent fragment know where the result is
  1213.     * ----------------
  1214.     */
  1215.        if (ExecIsHash(plan)) {
  1216.        /* ----------------
  1217.         *  if it is hashjoin, let the parent know where the hash table is
  1218.         * ----------------
  1219.         */
  1220.        set_hashjointable((HashJoin)parentPlan, hashtable);
  1221.        set_hashjointablekey((HashJoin)parentPlan, hashTableMemoryKey);
  1222.        set_hashjointablesize((HashJoin)parentPlan, hashTableMemorySize);
  1223.        set_hashdone((HashJoin)parentPlan, true);
  1224.       }
  1225.        else {
  1226.        List unionplans = LispNil;
  1227.        /* ------------------
  1228.         * if it is ScanTemps node, clean up the temporary relations
  1229.         * they are not needed any more
  1230.         * ------------------
  1231.         */
  1232.        if (ExecIsScanTemps(plan)) {
  1233.            Relation tempreldesc;
  1234.            List    tempRelDescs;
  1235.            LispValue y;
  1236.  
  1237.            tempRelDescs = get_temprelDescs((ScanTemps)plan);
  1238.            foreach (y, tempRelDescs) {
  1239.            tempreldesc = (Relation)CAR(y);
  1240.            ReleaseTmpRelBuffers(tempreldesc);
  1241.            if (FileNameUnlink(
  1242.               relpath((char*)&(tempreldesc->rd_rel->relname))) < 0)
  1243.                elog(WARN, "ExecEndScanTemp: unlink: %m");
  1244.         }
  1245.          }
  1246.        if (parentPlan == NULL /* WWW && nparallel == 1 */)
  1247.           /* in this case the whole plan has been finished */
  1248.           fraglist = nLispRemove(fraglist, (LispValue)fragment);
  1249.        else {
  1250.           /* -----------------
  1251.            *  make a ScanTemps node to let the parent collect the tuples
  1252.            *  from a set of temporary relations
  1253.            * -----------------
  1254.            */
  1255.           tempRelationDescList = 
  1256.             ProcGroupLocalInfoP[groupid].resultTmpRelDescList;
  1257.           p = ProcGroupLocalInfoP[groupid].memberProc;
  1258.           for (p = ProcGroupLocalInfoP[groupid].memberProc;
  1259.            p != NULL;
  1260.            p = p->next) {
  1261.          shmTempRelationDesc = SlaveInfoP[p->pid].resultTmpRelDesc;
  1262. #ifndef PALLOC_DEBUG             
  1263.          tempRelationDesc = CopyRelDescUsing(shmTempRelationDesc,
  1264.                              palloc);
  1265. #else
  1266.          tempRelationDesc = CopyRelDescUsing(shmTempRelationDesc,
  1267.                              palloc_debug);
  1268. #endif PALLOC_DEBUG             
  1269.          tempRelationDescList = nappend1(tempRelationDescList, 
  1270.                          (LispValue)tempRelationDesc);
  1271.          }
  1272.           scantempNode = RMakeScanTemps();
  1273.           set_qptargetlist((Plan)scantempNode, get_qptargetlist(plan));
  1274.           set_temprelDescs(scantempNode, tempRelationDescList);
  1275.           if (parentPlan == NULL) {
  1276.          set_frag_root(fragment, (Plan)scantempNode);
  1277.          set_frag_subtrees(fragment, LispNil);
  1278.          set_fragment((Plan)scantempNode,-1);
  1279.                         /*means end of parallelism */
  1280.          set_frag_is_inprocess(fragment, false);
  1281.          set_frag_iorate(fragment, 0.0);
  1282.         }
  1283.           else {
  1284.           if (plan == (Plan)get_lefttree(parentPlan)) {
  1285.          set_lefttree(parentPlan, (PlanPtr)scantempNode);
  1286.         }
  1287.           else {
  1288.          set_righttree(parentPlan, (PlanPtr)scantempNode);
  1289.         }
  1290.           set_fragment((Plan)scantempNode, get_fragment(parentPlan));
  1291.           }
  1292.        }
  1293.          }
  1294.        /* -----------------
  1295.     *  free shared memory
  1296.     *  free the finished processed group
  1297.     * -----------------
  1298.     */
  1299.        ProcGroupSMClean(groupid);
  1300.        freeProcGroup(groupid);
  1301.      }
  1302. }
  1303.